package com.it.windows;

import com.it.operator.utils.SourceUtils;
import com.it.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

/**
 * @author code1997
 */
public class WindowReduceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<Event> eventSource = SourceUtils.getEventSource(executionEnvironment)
                //调用flink内置的针对于有序流的watermark策略
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        SingleOutputStreamOperator<Tuple2<String, Long>> users = eventSource.map(event -> Tuple2.of(event.user, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
        SingleOutputStreamOperator<Tuple2<String, Long>> result = users.keyBy(user -> user.f0)
                //滑动事件时间窗口
                //.window(SlidingEventTimeWindows.of(Time.hours(1)))
                //滚动事件时间窗口：默认是整点，可以通过offset来进行调整
                .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
                //规约函数:
                .reduce((ReduceFunction<Tuple2<String, Long>>) (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG));
        result.print();
        executionEnvironment.execute();
    }
}
